-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation #45234
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
Should we also backport this patch to |
53dd089
to
a7a869f
Compare
cc @cloud-fan @maryannxue as well |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
a7a869f
to
ef8c50e
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
355aeb0
to
a923c2a
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
867134b
to
edf95b3
Compare
c178f2e
to
d0e4127
Compare
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
aaafa1d
to
4378034
Compare
4378034
to
c17240b
Compare
Thanks @cloud-fan and @ulysses-you for the reviews and approval. |
Build is green now so PR is ready to be merged. Thanks in advance. |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
ae77cbb
to
1105282
Compare
thanks, merging to master! |
… the cancellation ### What changes were proposed in this pull request? AQE can materialize both `ShuffleQueryStage` and `BroadcastQueryStage` on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job and Broadcast Job. Under normal circumstances, if the stage is already non-materialized (a.k.a `ShuffleQueryStage.shuffleFuture` or `BroadcastQueryStage.broadcastFuture` is not initialized yet), it should just be skipped without materializing it. **Problematic Stacktrace:** ``` at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:104) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:210) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:210) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:223) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$cleanUpAndThrowException$1(AdaptiveSparkPlanExec.scala:905) ``` Please find sample use-case: **1- Stage Materialization Steps:** When stage materialization is failed: ``` 1.1- ShuffleQueryStage1 - is materialized successfully, 1.2- ShuffleQueryStage2 - materialization is failed, 1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet ``` **2- Stage Cancellation Steps:** ``` 2.1- ShuffleQueryStage1 - is canceled due to already materialized, 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default by AQE because it could not be materialized, 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first. ``` **Reproduce Steps:** https://github.com/apache/spark/pull/45234/files#diff-f89f2fe78b324c6bc7190bef84220181f3616efc156ea99b3f15d375a22d7f88R900 ### Why are the changes needed? Current logic introduces unnecessary Shuffle Job / Broadcast Job to be able to cancel `ShuffleQueryStage` / `BroadcastQueryStage`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added new Unit Tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45234 from erenavsarogullari/SPARK-47148. Authored-by: erenavsarogullari <erenavsarogullari@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? A followup of #45234 to make the test more stable by using broadcast hint. ### Why are the changes needed? test improvement ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #47007 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
* such as waiting for the subqueries. | ||
*/ | ||
@transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery { | ||
materializationStarted.set(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a closer look, I don't think this change works as we expect. We set this materializationStarted
flag before we return the Future
, which means we are still on the AQE loop's main thread. That said, once we submit a query stage, its materializationStarted
becomes true immediately and we can't really avoid the wasted query stage execution when cancelling it.
The test passed because ShuffleExchangeExec
calls child.execute()
before returning the Future
. Then we exit the AQE loop without cancelling other stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further more, I don't think this idea works. Let's say when we want to cancel a query stage and the materializationStarted
flag is false, we decide to skip the cancellation but maybe the next second the materializationStarted
becomes true and we miss to cancel the shuffle job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a bit of synchronization here. The shuffle node should have two fields: isCancelled
flag and the shuffle job Future
.
- When we cancel a shuffle, we lock on the shuffle node, and set
isCancelled
flag to true. Then if the shuffle jobFuture
is present, we cancel it. - When we are going to submit a shuffle, we lock on the shuffle node. Then: if the
isCancelled
flag is true, fail immediately, otherwise, submit the shuffle job and set theFuture
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems not an acutally issue for now. AQE always do materilize stage and cancel stage at main thread. So if we decide to cancel stage then that means we will never do materilize stage again. We may need improve this code if we support do materilize concurrently in future.
…ages ### What changes were proposed in this pull request? We missed the fact that submitting a shuffle or broadcast query stage can be heavy, as it needs to submit subqueries and wait for the results. This blocks the AQE loop and hurts the parallelism of AQE. This PR fixes the problem by using shuffle/broadcast's own thread pool to wait for subqueries and other preparations. This PR also re-implements #45234 to avoid submitting the shuffle job if the query is failed and all query stages need to be cancelled. ### Why are the changes needed? better parallelism for AQE ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test case ### Was this patch authored or co-authored using generative AI tooling? no Closes #47533 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ages ### What changes were proposed in this pull request? We missed the fact that submitting a shuffle or broadcast query stage can be heavy, as it needs to submit subqueries and wait for the results. This blocks the AQE loop and hurts the parallelism of AQE. This PR fixes the problem by using shuffle/broadcast's own thread pool to wait for subqueries and other preparations. This PR also re-implements apache#45234 to avoid submitting the shuffle job if the query is failed and all query stages need to be cancelled. ### Why are the changes needed? better parallelism for AQE ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test case ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47533 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
AQE can materialize both
ShuffleQueryStage
andBroadcastQueryStage
on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job and Broadcast Job. Under normal circumstances, if the stage is already non-materialized (a.k.aShuffleQueryStage.shuffleFuture
orBroadcastQueryStage.broadcastFuture
is not initialized yet), it should just be skipped without materializing it.Problematic Stacktrace:
Please find sample use-case:
1- Stage Materialization Steps:
When stage materialization is failed:
2- Stage Cancellation Steps:
Reproduce Steps:
https://github.com/apache/spark/pull/45234/files#diff-f89f2fe78b324c6bc7190bef84220181f3616efc156ea99b3f15d375a22d7f88R900
Why are the changes needed?
Current logic introduces unnecessary Shuffle Job / Broadcast Job to be able to cancel
ShuffleQueryStage
/BroadcastQueryStage
.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added new Unit Tests
Was this patch authored or co-authored using generative AI tooling?
No